# Image De-Duplication
# Author: Robert Swetland
# Date: 2025
# Prerequisites: Python 3
# Required Python packages: OpenCV, NumPy, PathLib, SkLearn, ImageHash, PIL, ArgParse, Collections. Logging, Time, xml.etree.ElementTree, hashlib
# Required libraries
# In a terminal window paste the following
# pip install opencv-python numpy scikit-learn imagehash Pillow
# To run the script paste the following into a terminal window
# Specify the target folder as well as the output log
# python filecompare.py "Path\To\Image|files" --threshold 0.95 --output my_results.txt
# The threshold parameter (0.0 to 1.0) controls how similar images
# need to be to be considered duplicates. A higher threshold means
# images need to be more similar to be marked as duplicates.
import cv2
import numpy as np
from pathlib import Path
from sklearn.metrics.pairwise import cosine_similarity
import imagehash
from PIL import Image
import argparse
from collections import defaultdict
import logging
import time
import xml.etree.ElementTree as ET
import hashlib
# Set up logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def normalize_svg(file_path):
"""
Read and normalize SVG content for comparison.
Removes whitespace and normalizes attribute ordering.
Args:
file_path: Path to the SVG file
Returns:
Normalized string content of the SVG
"""
try:
# Parse SVG file
tree = ET.parse(file_path)
root = tree.getroot()
# Function to sort attributes
def sort_attributes(elem):
for child in elem:
sort_attributes(child)
# Sort attributes by key
attrib = dict(sorted(elem.attrib.items()))
elem.attrib.clear()
elem.attrib.update(attrib)
# Normalize the XML structure
sort_attributes(root)
# Convert back to string and remove whitespace
content = ET.tostring(root, encoding='unicode')
content = ''.join(content.split())
return content
except Exception as e:
logging.error(f"Error processing SVG {file_path}: {str(e)}")
return None
def compute_image_features(image_path):
"""
Compute features for an image file, handling both raster images and SVGs.
Args:
image_path: Path to the image file
Returns:
Dictionary containing computed features or None if image cannot be processed
"""
try:
if image_path.suffix.lower() == '.svg':
logging.debug(f"Processing SVG file: {image_path}")
content = normalize_svg(image_path)
if content is None:
return None
# Create a hash of the normalized content
content_hash = hashlib.md5(content.encode()).hexdigest()
return {
'type': 'svg',
'content_hash': content_hash,
'content': content
}
else:
logging.debug(f"Processing raster image: {image_path}")
# Handle raster images as before
pil_img = Image.open(image_path)
phash = str(imagehash.average_hash(pil_img))
ahash = str(imagehash.average_hash(pil_img))
dhash = str(imagehash.dhash(pil_img))
cv_img = cv2.imread(str(image_path))
if cv_img is None:
logging.warning(f"Failed to read image with OpenCV: {image_path}")
return None
cv_img = cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB)
hist = cv2.calcHist([cv_img], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
hist = cv2.normalize(hist, hist).flatten()
return {
'type': 'raster',
'phash': phash,
'ahash': ahash,
'dhash': dhash,
'histogram': hist
}
except Exception as e:
logging.error(f"Error processing image {image_path}: {str(e)}")
return None
def compare_features(features1, features2, similarity_threshold=0.95):
"""
Compare two sets of image features.
Args:
features1: First feature set
features2: Second feature set
similarity_threshold: Threshold for considering images as duplicates
Returns:
Boolean indicating if images are considered duplicates
"""
# If both are SVGs, compare their normalized content
if features1['type'] == 'svg' and features2['type'] == 'svg':
return features1['content_hash'] == features2['content_hash']
# If one is SVG and one is raster, they're not duplicates
if features1['type'] != features2['type']:
return False
# For raster images, use the original comparison logic
hash_match = (
features1['phash'] == features2['phash'] or
features1['ahash'] == features2['ahash'] or
features1['dhash'] == features2['dhash']
)
hist_similarity = cosine_similarity(
features1['histogram'].reshape(1, -1),
features2['histogram'].reshape(1, -1)
)[0][0]
return hash_match or hist_similarity > similarity_threshold
def find_duplicates(folder_path, similarity_threshold=0.95):
"""
Find duplicate images in the specified folder and all its subfolders.
Handles both raster images and SVG files.
Args:
folder_path: Path to the root folder containing images
similarity_threshold: Threshold for considering images as duplicates (0.0 to 1.0)
Returns:
Dictionary mapping group IDs to lists of duplicate image paths
"""
start_time = time.time()
try:
# Convert to absolute path and resolve any special characters
folder_path = Path(folder_path).absolute().resolve()
if not folder_path.exists():
logging.error(f"Folder not found: {folder_path}")
return {}
logging.info(f"Resolved path: {folder_path}")
except Exception as e:
logging.error(f"Error processing folder path: {str(e)}")
return {}
# Collect all image files
logging.info("Collecting image files from all folders...")
image_files = []
supported_formats = ('*.jpg', '*.jpeg', '*.png', '*.bmp', '*.gif', '*.svg')
for ext in supported_formats:
found_files = list(folder_path.rglob(ext))
# Process each file path
valid_files = []
for file_path in found_files:
processed_path = process_file_path(file_path)
if processed_path:
valid_files.append(processed_path)
image_files.extend(valid_files)
logging.info(f"Found {len(valid_files)} valid files with extension {ext}")
logging.info(f"Total valid images found: {len(image_files)}")
# Compute features for all images
features_dict = {}
logging.info("Computing image features...")
for i, img_path in enumerate(image_files, 1):
logging.info(f"Processing image {i}/{len(image_files)}: {img_path}")
features = compute_image_features(img_path)
if features is not None:
features_dict[img_path] = features
# Find duplicates
duplicate_groups = defaultdict(list)
processed_images = set()
logging.info("Starting duplicate detection...")
total_comparisons = len(features_dict) * (len(features_dict) - 1) // 2
comparison_count = 0
for i, (img1_path, features1) in enumerate(features_dict.items()):
if img1_path in processed_images:
continue
current_group = {img1_path}
for img2_path, features2 in features_dict.items():
if img1_path == img2_path or img2_path in processed_images:
continue
comparison_count += 1
if comparison_count % 1000 == 0:
progress = (comparison_count / total_comparisons) * 100
logging.info(f"Progress: {progress:.1f}% ({comparison_count}/{total_comparisons} comparisons)")
if compare_features(features1, features2, similarity_threshold):
current_group.add(img2_path)
if len(current_group) > 1:
group_id = len(duplicate_groups)
duplicate_groups[group_id] = list(current_group)
processed_images.update(current_group)
elapsed_time = time.time() - start_time
logging.info(f"Processing completed in {elapsed_time:.2f} seconds")
logging.info(f"Found {len(duplicate_groups)} groups of duplicate images")
return duplicate_groups
def sanitize_path(path_str):
"""
Sanitize and validate a file path.
Handles special characters including @, spaces, and unicode characters.
Args:
path_str: String representation of the path
Returns:
Pathlib Path object or None if invalid
"""
try:
# Convert to Path object and resolve to absolute path
path = Path(path_str).absolute().resolve()
# Check if path contains special characters
if '@' in str(path):
logging.info(f"Path contains @ symbol: {path}")
if not path.exists():
logging.error(f"Path does not exist: {path}")
return None
return path
except Exception as e:
logging.error(f"Invalid path: {path_str} - Error: {str(e)}")
return None
def process_file_path(file_path):
"""
Process and validate an individual file path.
Args:
file_path: Path object for the file
Returns:
Validated Path object or None if invalid
"""
try:
if '@' in str(file_path):
logging.debug(f"Processing file with @ symbol: {file_path}")
# Check if file actually exists
if not file_path.is_file():
logging.warning(f"File not found: {file_path}")
return None
return file_path
except Exception as e:
logging.error(f"Error processing file path: {file_path} - {str(e)}")
return None
def write_results_to_file(duplicate_groups, output_file="duplicate_results.txt"):
"""
Write duplicate detection results to a text file.
Args:
duplicate_groups: Dictionary of duplicate groups
output_file: Path to output file (default: duplicate_results.txt)
"""
try:
with open(output_file, 'w', encoding='utf-8') as f:
if not duplicate_groups:
f.write("No duplicate images found.\n")
return
f.write(f"Found {len(duplicate_groups)} groups of duplicate images\n")
f.write("=" * 50 + "\n\n")
for group_id, group in duplicate_groups.items():
f.write(f"Group {group_id + 1}:\n")
for img_path in group:
f.write(f" {img_path}\n")
f.write("\n")
logging.info(f"Results written to: {output_file}")
except Exception as e:
logging.error(f"Error writing results to file: {str(e)}")
def main():
"""
Main function that handles command-line arguments and runs the duplicate detection.
Handles paths with spaces and special characters.
"""
import sys
# Pre-process arguments to handle unquoted paths with spaces
args_list = sys.argv[1:]
if args_list:
# Find the first flag (starting with --) or end of list
first_flag_index = next((i for i, arg in enumerate(args_list) if arg.startswith('--')), len(args_list))
# Join all arguments before the first flag as a single path
if first_flag_index > 0:
path_parts = args_list[:first_flag_index]
joined_path = ' '.join(path_parts)
args_list = [joined_path] + args_list[first_flag_index:]
parser = argparse.ArgumentParser(
description='Find duplicate images (including SVGs) in a folder and its subfolders',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
# Path with spaces and special characters (either format works):
python filecompare.py "E:\Files\My Photos\@ New Folder" --threshold 0.95
python filecompare.py E:\Files\My Photos\@ New Folder --threshold 0.95
# Simple path:
python filecompare.py E:\Files\Photos --threshold 0.95
""")
parser.add_argument('folder_path',
help='Path to the root folder containing images')
parser.add_argument('--threshold', type=float, default=0.95,
help='Similarity threshold (0.0 to 1.0) for raster images')
parser.add_argument('--output', type=str, default='duplicate_results.txt',
help='Output file path for results (default: duplicate_results.txt)')
parser.add_argument('--debug', action='store_true',
help='Enable debug logging')
try:
args = parser.parse_args(args_list)
except argparse.ArgumentError as e:
logging.error(f"Error parsing arguments: {str(e)}")
return
except Exception as e:
logging.error(f"Unexpected error: {str(e)}")
return
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
logging.info(f"Starting duplicate image detection in: {args.folder_path}")
logging.info(f"Similarity threshold: {args.threshold}")
duplicate_groups = find_duplicates(args.folder_path, args.threshold)
# Write results to file and display them
write_results_to_file(duplicate_groups, args.output)
# Also display results in console
if not duplicate_groups:
logging.info("No duplicate images found.")
return
print("\nFound duplicate groups:")
for group_id, group in duplicate_groups.items():
print(f"\nGroup {group_id + 1}:")
for img_path in group:
print(f" {img_path}")
logging.info("Duplicate detection completed successfully")
if __name__ == "__main__":
main()